package com.huan.flink;

import com.huan.flink.map.Product;
import com.huan.flink.map.ProductMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * flink 中窗口的使用 - reduce 增量聚合的使用
 *
 * @author huan.fu
 * @date 2024/1/6 - 11:36
 */
public class FlinkWindowReduceApplication {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为2
        environment.setParallelism(2);

        environment.socketTextStream("localhost", 9999)
                .map(new ProductMapFunction())
                .keyBy((KeySelector<Product, Integer>) Product::getProductId)
                // 基于 "处理时间" 语义的 滚动 窗口，窗口大小为10s
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                /**
                 * 1、在一个窗口周期内：相同 key 的第一条数据来的时候，不会调用reduce方法。
                 * 2、增量聚合：数据来一条，就会进行计算一次，但是不会输出
                 * 3、在窗口结束的时候，才会触发输出
                 * 4、reduce 可以解决大多数归约聚合的问题，但是这个接口有一个限制，就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。
                 */
                .reduce((ReduceFunction<Product>) (p1, p2) -> {
                    System.out.println("执行了 reduce 操作 p1: " + p1 + ", p2: " + p2);

                    // 进行增量聚合操作 productId 和 productName 取第一个对象的值 createTime 取最后一个对象的值
                    Product product = new Product();
                    product.setProductId(p1.getProductId());
                    product.setProductName(p1.getProductName());
                    product.setCreateTime(p2.getCreateTime());
                    return product;
                })
                .print();

        environment.execute("window api");
    }
}
